跳到主要内容

Java Netty 的基本使用

这篇博客只是记录一个 Netty 如何使用,简单的介绍了一下用到的组件,具体各个组件的联系看下一篇

编写 Hello World

先写一个 Hello World 再进行 Netty 的学习,不需要知道它是啥,照着贴就行了,主要是感受一下 Netty 的便捷

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.66.Final</version>
</dependency>

开发一个简单的服务端和客户端

  • 客户端向服务端发送 hello world
  • 服务端仅接收,不返回

编写服务端

public class HelloServer {
public static void main(String[] args) {
// 1、服务端启动器,负责装配 netty 组件,启动服务器
new ServerBootstrap()
// 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
.group(new NioEventLoopGroup())
// 3、选择服务器的 ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
// 4、child 负责处理读写,该方法决定了 child 执行哪些操作
// ChannelInitializer 处理器(仅执行一次)
// 它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
.childHandler(
// 这个 ChannelInitializer 可以分开理解
// Channel 代表和客户端进行数据读写的通道,Initializer 初始化
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 这里的 addLast 就是添加 Handler 处理器
// 5、SocketChannel的处理器,使用 StringDecoder 解码,就是把客户端穿过来的 ByteBuf 转换成 String
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 6、SocketChannel 的业务处理(自定义处理器),使用上一个处理器的处理结果,这里就把数据打印出来
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
// 7、ServerSocketChannel 绑定 8080 端口
}).bind(8080);
}
}

编写客户端

public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
// 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现
.channel(NioSocketChannel.class)
// ChannelInitializer 处理器(仅执行一次)
// 它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出
channel.pipeline().addLast(new StringEncoder());
}
})
// 指定要连接的服务器和端口
.connect(new InetSocketAddress("localhost", 8080))
// Netty 中很多方法都是异步的,如 connect
// 这时需要使用 sync 方法等待 connect 建立连接完毕
.sync()
// 获取 channel 对象,它即为通道抽象,可以进行数据读写操作
.channel()
// 写入消息并清空缓冲区
.writeAndFlush("hello world");
}
}

Netty 的核心部件

Netty 由以下部件构成

  • Channel
  • 回调
  • Future
  • 事件和 ChannelHandler

Channel 通道

这个就不用过多介绍了,这个就是 NIO 的概念,它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的 IO 操作的程序组件)的开放连接,如读操作和写操作。

目前,可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。

事件回调

Netty 在内部使用了回调来处理事件,当一个回调被触发时,相关的事件可以被一个 ChannelHandler 接口的实现处理。

如下代码所示:当一个新的连接已经被建立时,ChannelHandler 的 channelActive() 回调方法将会被调用,并将打印出一条信息。

// 这里调用的是 ChannelInboundHandlerAdapter 适配器(入站规则),它内部写好了大部分代码,使之无需从头开始写~
ChannelInboundHandlerAdapter channelHandler = new ChannelInboundHandlerAdapter() {
// 当一个新连接已经被建立时,channelActive 将会被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx.channel().remoteAddress() + " connected");
}
};

Future

这个 Future 顾名思义,就是将在未来的某个时候拿到结果。

这个 Future 不是 Netty 包下的,而是 JDK 提供的接口 java.util.concurrent.Future,但是 JDK 所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这样搞得非常繁琐的,所以 Netty 提供了它自己的实现 ChannelFuture,用于在执行异步操作的时候使用。

ChannelFuture 提供了几种额外的方法,这些方法使得我们能够注册一个或者多个 ChannelFutureListener 监听器实例。

监听器的回调方法 operationComplete(),将会在对应的操作完成时被调用。然后监听器可以判断该操作是成功地完成了还是出错了。如果是后者,我们可以检索产生的 Throwable。

简而言之,由 ChannelFutureListener 提供的通知机制消除了手动检查对应的操作是否完成的必要。

每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture,也就是说,它们都不会阻塞。正如前面所提到过的一样,Netty 完全是异步和事件驱动的。

使用示例:

// 首先创建一个通道(这个通道的选择之后讲)
Channel channel = ....;
// 异步的连接到远程节点(注意:这个方法不会阻塞,它会在后台被调用)
ChannelFuture future = channel.connect(new InetSocketAddress("127.0.0.1", 25));


// 给这个 future 注册一个监听器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 检查这个 ChannelFuture 的状态,如果操作是成功的,那么将数据写到 Channel 里面,否则处理里面的异常(Throwable)
if (future.isSuccess()) {
// 如果操作成功,则创建一个 ByteBuf 持有数据
ByteBuf buffer = Unpooled.copiedBuffer("Hello", Charset.defaultCharset());
// 再次将数据异步的发送到远程节点上去
ChannelFuture wf = future.channel().writeAndFlush(buffer);
// ...
} else {
// 否则就是发生错误了
Throwable cause = future.cause();
cause.printStackTrace();
}
}
});

事件和 ChannelHandler

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。这些动作可能是:

  • 记录日志
  • 数据转换
  • 流控制
  • 应用程序逻辑。

Netty 是一个网络编程框架,所以事件是按照它们与入站或出站数据流的相关性进行分类的。可能由入站数据或者相关的状态更改而触发的事件包括:

  • 连接已被激活或者连接失活;
  • 数据读取;
  • 用户事件;
  • 错误事件。

出站事件是未来将会触发的某个动作的操作结果,这些动作包括:

  • 打开或者关闭到远程节点的连接
  • 将数据写到或者冲刷到套接字。

每个事件都可以被分发给 channelHandler 类中的某个用户实现的方法。

Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议(如 HTTP 和 SSL/TLS)的 ChannelHandler。

在内部,ChannelHandler 自己也使用了事件和 Future,使得它们也成为了你的应用程序将使用的相同抽象的消费者。

补充:Netty 内部的工作原理

Netty 通过触发事件将 Selector 从应用程序中抽象出来,消除了所有本来将需要手动编写的派发代码。在内部,将会为每个 Channel 分配一个 EventLoop,用以处理所有事件,包括:

  • 注册感兴趣的事件;
  • 将事件派发给 ChannelHandler;
  • 安排进一步的动作。

EventLoop 本身只由一个线程驱动,其处理了一个 Channel 的所有 I/O 事件,并且在该 EventLoop 的整个生命周期内都不会改变。

后面会细讲这块。这里只需了解就行了~

编写服务端示例

所有的 Netty 服务器都需要以下两部分。

  • 至少一个 ChannelHandler,该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
  • 启动参数:这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口上。

称其这个示例为 Echo 服务器

编写 ChannelHandler

因为这个 Echo 服务器会响应传入的消息,所以它需要实现 ChannelInboundHandler 接口(入站处理器),用来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承 ChannelInboundHandlerAdapter 类也就足够了,它提供了 ChannelInboundHandler 的默认实现。

这个服务端需要操作的部分:

  • channelRead():对于每个传入的消息都要调用;
  • channelReadComplete():通知 ChannelInboundHandler 最后一次对 channelRead() 的调用是当前批量读取中的最后一条消息;
  • exceptionCaught():在读取操作期间,有异常抛出时会调用。
@ChannelHandler.Sharable  // 标识一个 ChannelHandler 可以被多个 Channel 安全的共享(如果不加则每次使用都得创建一个实例)
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
// 取得入站消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received:" + in.toString(Charset.defaultCharset()));
// 再次将接收到的消息发回给发送者,而不冲刷出站消息
ctx.write(in);
}

/**
* 补充 channelReadComplete 和上面的 channelRead 区别
*
* eventLoop 被到来的数据唤醒后 read 数据并包装成 msg,然后将 msg 作为参数调用 channelRead 方法,
* 期间做个判断,read 到 0 个字节或者是 read 到的字节数小于 buffer 的容量(就是只在该条消息最后一次读取完成的时候调用),
* 满足以上条件就会调用 channelReadComplete 方法。
*
* 因为 ByteBuf 是有长度限制的,所以超长了,就会多次读取,也就是调用多次 channelRead,而 channelReadComplete
* 则是每条消息只会调用一次,无论你多长,分多少次读取,只在该条消息最后一次读取完成的时候调用,所以这段代码把关闭 Channel
* 的操作放在 channelReadComplete 里,放到 channelRead 里可能消息太长了,结果第一次读完就关掉连接了,后面的消息全丢了。
*
* 具体区别参考:
* https://segmentfault.com/q/1010000018753423
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
// 这个 ChannelFutureListener.CLOSE 内部执行的就是 future.channel().close();
.addListener(ChannelFutureListener.CLOSE); // 将未处理的消息冲刷到远程节点,并关闭该 Channel
}

// 如果出现异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close(); // 关闭该 Channel
}
}

如果这里不处理异常,那它会被传递给下一个 ChannelHandler

编写引导服务器

  • 绑定到服务器将在其上监听并接受传入连接请求的端口;
  • 配置 Channel,以将有关的入站消息通知给 EchoServerHandler 实例。

如下代码:

public class EchoServer {
private final int port = 8080;

public static void main(String[] args) {

}

public void start() throws InterruptedException {
// 创建一个之前写的 Handler 实例
final EchoServerHandler serverHandler = new EchoServerHandler();

EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
// 指定所使用的 NIO 传输 Channel(这里就是 TCP 连接)
.channel(NioServerSocketChannel.class)
// 指定使用的端口
.localAddress(new InetSocketAddress(port))
// 这个 ChannelInitializer 的作用:
// 当一个新的连接被接受时,一个新的子 Channel 将会被创建,而 ChannelInitializer
// 将会把一个 EchoServerHandler(下面)添加到子 Channel 的 ChannelPipeline 里面
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
// 因为这个 EchoServerHandler 被注解标识为了 @Sharable,所以可以使用相同的实例
ch.pipeline().addLast(serverHandler);
}
});
// 异步的绑定服务器;调用 sync 方法阻塞等待直到绑定完成(sync() 方法的调用将导致当前线程阻塞,一直到绑定操作完成为止)
ChannelFuture f = b.bind().sync();
// 获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成
f.channel().closeFuture().sync(); // 该应用程序将会阻塞等待,直到服务器的 Channel 关闭
} finally {
// 关闭 EventLoopGroup 释放所有资源
group.shutdownGracefully().sync();
}
}
}

这里的 ChannelInitializer、EventLoopGroup 之后再讲,总之先回顾一下你刚完成的服务器实现中的重要步骤。下面这些是服务器的主要代码组件:

  • EchoServerHandler 实现了业务逻辑:
  • main() 方法引导了服务器;

引导过程中所需要的步骤如下:

  • 创建一个 ServerBootstrap 的实例以引导和绑定服务器
  • 创建并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/写数据
  • 指定服务器绑定的本地的 InetSocketAddress
  • 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel
  • 调用 serverBootstrap.bind() 方法以绑定服务器。

在这个时候,服务器已经初始化,并且已经就绪能被使用了。

编写客户端示例

1、连接到服务器 2、发送一个或多个消息 3、对于每个消息,等待并接收从服务器发回的相同消息 4、关闭连接

编写客户端所涉及的两个主要代码部分也是业务逻辑和引导,和在服务器中看到的一样。

编写 ChannelHandler

如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler。

在这个场景下,可以扩展 SimpleChannelInboundHandler 类以处理所有必须的任务,要求重写下面的方法:

  • channelActive() 在到服务器的连接已经建立之后将被调用;
  • channelRead0() 当从服务器接收到一条消息时被调用
  • exceptionCaught() 在处理过程中引发异常时被调用。
@ChannelHandler.Sharable // 标识可以被多个共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

/**
* 这个方法会在一个连接建立时被调用,这确保了数据将会尽可能快的写入服务器
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 当被通知 Channel 是活跃时,发送一条数据
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", Charset.defaultCharset()));
}

/**
* 每当接收数据时都会调用这个方法接,需要注意的是:
* 由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了 5 字节,那么不能保证这 5 字节会被
* 一次性接收。即使是对于这么少量的数据,channelRead0() 方法也可能会被调用两次,例如:第一次使用一
* 个持有 3 字节的 ByteBuf (Netty 的字节容器),第二次使用一个持有 2 字节的 ByteBuf。作为一个面向
* 流的协议,TCP 保证了字节数组将会按照服务器发送它们的顺序被接收。
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// 接收到消息
System.out.println("Client received: " + msg.toString(Charset.defaultCharset()));
}

/**
* 出现异常时会调用这个方法
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常时,记录错误,并关闭 Channel
cause.printStackTrace();
ctx.close();
}
}

一般用 Netty 来发送和接收数据都会继承 SimpleChannelInboundHandler 和 ChannelInboundHandlerAdapter 这两个抽象类,那么这两个到底有什么区别呢?

其实用这两个抽象类是有讲究的,在客户端的业务 Handler 继承的是 SimpleChannelInboundHandler,而在服务器端继承的是 ChannelInboundHandlerAdapter。

这个具体后面会讲

编写引导客户端

基本和服务端一致,除了这里使用的是 Bootstrap 进行引导,同时把传输的 Channel 换成了 NioSocketChannel

public class EchoClient {
private final String host = "127.0.0.1";
private final int port = 8080;

public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
// 这里使用的是 NioSocketChannel
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}

public static void main(String[] args) throws InterruptedException {
new EchoClient().start();
}
}

使用 JDK 写 OIO 和 NIO

回顾:原生编写一个 OIO

回顾一下 使用 JDK 写 OIO(阻塞 IO)是怎样的?

public class PlainOioServer {
public void server(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port);
try {
for (; ; ) {
final Socket clientSocket = socket.accept(); // 接收连接(这个 accept 是阻塞的)
System.out.println("Accepted Connection from " + clientSocket);
// 创建新线程来处理该连接
new Thread(() -> {
try (OutputStream out = clientSocket.getOutputStream()) {
out.write("Hi!\r\n".getBytes(StandardCharsets.UTF_8));
out.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close(); // 别忘了关闭连接
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}

}
}

但是并发量一起来,这样写就不行了(线程池也不行)

改写成原生 NIO 的形式

把上面 OIO 的代码改成 NIO 的形式,执行流程如下:

如下代码:

public class PlainNioServer {

public void server(int port) throws IOException {
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false); // 令其为非阻塞模式

ServerSocket ssocket = socketChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ssocket.bind(address); // 将服务器绑定到选定的端口
Selector selector = Selector.open(); // 打开一个 Selector 来处理 Channel
// 把这个选择器注册到 ACCEPT 事件上去(就是发生这个事件时会通知这个 Selector)
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 创建一个消息缓冲区
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes(StandardCharsets.UTF_8));
for(;;) {
try {
selector.select(); // 等待需要处理的事件(阻塞将一直维持到下一个传入的事件)
} catch (IOException ex) {
ex.printStackTrace();
break;
}

Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取所有接收事件的 SelectionKey 实例
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 处理该事件先移除它(否则会一直存在队列里面)
try {
// 检查该事件是否是已经就绪可以被接受的连接
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept(); // 这里就可以直接从 ServerSocketChannel 取出这个客户端的连接
// 接受客户端,并将它注册到选择器,这个 msg.duplicate() 就是把上面创建的消息传入下一个 Writable 事件
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_WRITE, msg.duplicate());
System.out.println("Accented connection from " + client);
}
// 检查这个 Socket 是否准备好写数据了
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
// 这个 attachment 取得的就是取得上面 Acceptable 事件传入的 msg 消息
ByteBuffer buffer = (ByteBuffer)key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) { // 将数据写入已经连接的客户端
break;
}
}

client.close(); // 关闭连接
}
} catch (IOException e) {
key.channel().close(); // 发生异常则直接释放掉这个事件
e.printStackTrace();
}
}
}
}
}

这个 NIO 版本的代码和上面完全相同,但是却写的这么复杂,所以需要 Netty 来简化这些操作

通过 Netty 写 NIO

OIO 就不写了,因为 Netty 已经弃用了 OioEventLoopGroup,这里就直接讲 NIO 的那个

public class NettyOioServer {
public void server(int port) throws InterruptedException {
final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", Charset.defaultCharset());
// 为非阻塞模式使用 NioEventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 将消息写到客户端,并在写完后执行 future.channel().close();
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();// 绑定服务器接受连接
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); // 释放所有的资源
}
}
}